package xiaoa.java.jms.activeMQ.Message;

import java.io.IOException;



import java.util.HashMap;
import java.util.Map;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;


import xiaoa.java.log.L;

/**
 * 发送者连接池
 * @author xiaoa
 * @date 2017年3月22日 上午10:11:03
 * @version V1.0
 *
 */
public class MessageProducerPool {
	
	
	    // 连接工厂
		private    ActiveMQConnectionFactory  connectionFactory     =  null;
		
		
		/**
		 * 连接
		 */
		private    ActiveMQConnection            defaultConnection  =  null;
		
		/**
		 * url
		 */
		private   String                                  url       =  null ;
		
		
		/**
		 * 保存每个线程的session
		 */
		private ThreadLocal<Map<String, Session>>  sessionThreadLocal   =   new ThreadLocal<Map<String, Session>>();
		
		/**
		 * 保存每个线程的session
		 */
		private ThreadLocal<MessageProducer>  messageProducerThreadLocal   =   new ThreadLocal<MessageProducer>();

		
		/**
		 * 
		 * 构造器
		 * <p>Title: </p>
		 * <p>Description: </p>
		 * @author xiaoa
		 * @param url
		 */
		public   MessageProducerPool(String url) throws Throwable {
			
			if (url == null){
				throw new RuntimeException("url is null");
			}
			
			this.url  = url;
			
			// 实例化连接工厂
			connectionFactory                = new ActiveMQConnectionFactory(this.url);
			
			// 构造从工厂得到连接对象
			defaultConnection          = (ActiveMQConnection)connectionFactory.createConnection();
			
			// 设置监听
			defaultConnection.addTransportListener(new simpleTransportListener());
			
		}
		
		
		/**
		 * 创建一个session
		 * @Title: createSession
		 * @return
		 * @author xiaoa
		 */
		public  Session createSession(boolean  transacted, int acknowledgeMode)throws Throwable{
			
			Map<String, Session>  sessionMap = 	sessionThreadLocal.get();
			
			if (sessionMap == null){
				sessionMap = new HashMap<String, Session>();
				sessionThreadLocal.set(sessionMap);
			}
			
			String sessionName = "session_" + transacted + "_" + acknowledgeMode;
			
			Session  session = sessionMap.get(sessionName);
			
			if (session == null){
				session = defaultConnection.createSession(transacted,acknowledgeMode);
				sessionMap.put(sessionName, session);
			}
			
			return session;
		}
		
		
	
		
		/**
		 * 创建一个生产者
		 * @Title: createMessageProducer
		 * @param queueName
		 * @param transacted         是否需要事务
		 * @param acknowledgeMode    提交方式
		 * @param deliveryMode       是否持久化
		 * @return
		 * @throws Throwable
		 * @author xiaoa
		 */
		public MessageProducer  createMessageProducer(String queueName , boolean transacted, int acknowledgeMode , int deliveryMode )throws Throwable{
			
			// 尝试从线程池中获取
			MessageProducer  producer =  messageProducerThreadLocal.get();
			
			if (producer != null){
				return producer;
			}
		
			// 创建一个会话
			Session session           =   createSession(transacted,acknowledgeMode);
			
			// 创建队列
			Queue   sendQueue         =   session.createQueue(queueName);
			
			// 创建一个生产者
			producer =   session.createProducer(sendQueue);
			
			// 设置是否持久化
			producer.setDeliveryMode(deliveryMode);
			
			messageProducerThreadLocal.set(producer);
			
			return producer;
		}
		
		/**
		 * 发送信息
		 * @Title: sendMessage
		 * @param queueName
		 * @param message
		 * @throws Throwable
		 * @author xiaoa
		 */
		public void sendMessage(String queueName , Message message)throws Throwable{
			
			// 创建一个发送者
			MessageProducer  messageProducer  = createMessageProducer(queueName);
			
			messageProducer.send(message);
			
		}
		
		
		/**
		 * 创建一个message
		 * @Title: createMessage
		 * @return
		 * @throws Throwable
		 * @author xiaoa
		 */
		public Message createMessage(boolean transacted ,int acknowledgeMode)throws Throwable{
			
			// 创建一个session
			Session  session = createSession(transacted, acknowledgeMode);
			
			// 创建一个message
			Message  message = session.createMessage();
			
			return message;
		}
		
		
		/**
		 * 创建一个message
		 * @Title: createMessage
		 * @return
		 * @throws Throwable
		 * @author xiaoa
		 */
		public Message createMessage()throws Throwable{
			
			return createMessage( Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
			
		}

		
		/**
		 * 创建一个生产者
		 * @Title: createMessageProducer
		 * @param queueName
		 * @return
		 * @throws Throwable
		 * @author xiaoa
		 */
		public MessageProducer createMessageProducer(String queueName)throws Throwable{
			
			// 创建默认生产者
			return createMessageProducer(queueName, Boolean.FALSE,Session.AUTO_ACKNOWLEDGE, DeliveryMode.PERSISTENT);
		}
		
		
		/**
		 * 连接监听
		 * @author xiaoa
		 * @date 2017年3月22日 上午9:30:10
		 * @version V1.0
		 *
		 */
		public class simpleTransportListener implements TransportListener {

			@Override
			public void onCommand(Object command) {
				
			}

			@Override
			public void onException(IOException error) {
				
				if (error != null){
					error.printStackTrace();
				}
				
				// 当连接发生异常
				onConnectionException(error);
				
			}

			@Override
			public void transportInterupted() {
				L.info("===============>> 重连");
			}

			@Override
			public void transportResumed() {
				
			}}
		
		
		
		/**
		 * 当连接发生异常
		 * @Title: onConnectionException
		 * @param e
		 * @author xiaoa
		 */
	     public void onConnectionException( Throwable e ){
	    	 
			
		}
	

}
